package com.yxsk.relay.job.component.endpoint.facade;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.yxsk.relay.job.component.common.constant.UriConstant;
import com.yxsk.relay.job.component.common.exception.remote.RemoteCallException;
import com.yxsk.relay.job.component.common.protocol.caller.RemoteCaller;
import com.yxsk.relay.job.component.common.protocol.message.base.ResultResponse;
import com.yxsk.relay.job.component.common.protocol.message.collect.ExecuteResultCollectRequest;
import com.yxsk.relay.job.component.common.protocol.message.collect.ExecuteResultCollectResponse;
import com.yxsk.relay.job.component.common.protocol.message.execute.JobExecuteRequest;
import com.yxsk.relay.job.component.common.protocol.message.execute.JobExecuteResponse;
import com.yxsk.relay.job.component.common.protocol.message.execute.async.JobExecuteCallbackRequest;
import com.yxsk.relay.job.component.common.protocol.message.execute.async.JobExecuteCallbackResponse;
import com.yxsk.relay.job.component.common.protocol.message.execute.async.query.AsyncJobQueryRequest;
import com.yxsk.relay.job.component.common.protocol.message.execute.async.query.AsyncJobQueryResponse;
import com.yxsk.relay.job.component.common.protocol.message.log.delete.LogDeleteRequestDto;
import com.yxsk.relay.job.component.common.protocol.message.log.delete.LogDeleteResponseDto;
import com.yxsk.relay.job.component.common.protocol.message.log.query.LogQueryRequestDto;
import com.yxsk.relay.job.component.common.protocol.message.log.query.LogQueryResponseDto;
import com.yxsk.relay.job.component.common.protocol.message.partition.JobPartitionRequest;
import com.yxsk.relay.job.component.common.protocol.message.partition.JobPartitionResponse;
import com.yxsk.relay.job.component.common.utils.CollectionUtils;
import com.yxsk.relay.job.component.common.utils.DateUtils;
import com.yxsk.relay.job.component.common.utils.ExceptionUtils;
import com.yxsk.relay.job.component.common.utils.SerialNoUtils;
import com.yxsk.relay.job.component.common.vo.ExecuteResult;
import com.yxsk.relay.job.component.endpoint.async.AsyncTask;
import com.yxsk.relay.job.component.endpoint.async.RelayJobAsyncWorker;
import com.yxsk.relay.job.component.endpoint.config.RelayJobAdminConfig;
import com.yxsk.relay.job.component.endpoint.context.RelayJobNetContextHolder;
import com.yxsk.relay.job.component.endpoint.exception.RelayJobEndpointException;
import com.yxsk.relay.job.component.endpoint.exception.facade.JobExecutorNotFoundException;
import com.yxsk.relay.job.component.endpoint.exception.facade.UnsupportedCollectException;
import com.yxsk.relay.job.component.endpoint.exception.facade.UnsupportedPartitionException;
import com.yxsk.relay.job.component.endpoint.handle.Collectable;
import com.yxsk.relay.job.component.endpoint.handle.JobExecutor;
import com.yxsk.relay.job.component.endpoint.handle.Partitionable;
import com.yxsk.relay.job.component.endpoint.handle.context.RelayJobExecutorManager;
import com.yxsk.relay.job.component.endpoint.log.stream.LogStream;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import javax.validation.Valid;
import java.util.*;
import java.util.concurrent.ExecutorService;

/**
 * @Author 11376
 * @CreaTime 2019/6/15 16:49
 * @Description
 */
@Slf4j
@Builder
public class DefaultJobHeadman implements JobHeadman {

    public static final String RELAY_JOB_ERROR_CODE_PREFIX = "RELAY-JOB-ERROR-CODE-";

    private ExecutorService taskExecutor;

    private RemoteCaller remoteCaller;

    private RelayJobAdminConfig adminConfig;

    private LogStream logStream;

    // 任务信息
    private Map<String, AsyncTask> tasks;

    // 回调处理器
    private JobResultCallback resultCallback;

    public void setResultCallback(JobResultCallback resultCallback) {
        this.resultCallback = resultCallback;
    }

    @Override
    public ResultResponse<JobExecuteResponse> execute(@Valid JobExecuteRequest request) {

        // 实例化响应对象
        JobExecuteResponse response = new JobExecuteResponse();
        response.setRequestSerialNo(request.getSerialNo());
        String respCode;

        try {
            JobExecutor executor = RelayJobExecutorManager.getNewExecutor(request.getHandleName());
            String result = executor.execute(request.getParam());

            // 组装响应报文
            response.setExecuteStatus(ExecuteResult.ExecuteResultCode.OK.getCode());
            response.setResult(result);
            response.setRespTime(DateUtils.getCurrentDate());

            return ResultResponse.ok(response);
        } catch (JobExecutorNotFoundException e) {
            log.error("获取执行器错误", e);
            response.setExecuteStatus(ExecuteResult.ExecuteResultCode.EXECUTOR_NOT_FOUND.getCode());
            response.setErrorMsg(e.getMessage());

            respCode = RELAY_JOB_ERROR_CODE_PREFIX + ExecuteResult.ExecuteResultCode.EXECUTOR_NOT_FOUND.getCode();
        } catch (Exception e) {
            log.error("执行任务错误", e);
            response.setExecuteStatus(ExecuteResult.ExecuteResultCode.ERROR.getCode());
            response.setErrorMsg(ExceptionUtils.getStackInfo(e));

            respCode = RELAY_JOB_ERROR_CODE_PREFIX + ExecuteResult.ExecuteResultCode.ERROR.getCode();
        }

        response.setRespTime(DateUtils.getCurrentDate());
        return ResultResponse.fail(respCode, response);
    }

    @Override
    public ResultResponse<JobExecuteResponse> asyncExecute(@Valid JobExecuteRequest executeRequest) {
        JobExecuteResponse response = new JobExecuteResponse();
        response.setRequestSerialNo(executeRequest.getSerialNo());

        String respCode;
        try {
            // 执行器
            JobExecutor executor = RelayJobExecutorManager.getNewExecutor(executeRequest.getHandleName());
            // 异步任务
            AsyncTask asyncTask = buildAsyncTask(executeRequest, executor);
            // 添加任务信息
            this.tasks.put(executeRequest.getJobId(), asyncTask);
            // 提交异步任务
            this.taskExecutor.submit(asyncTask);

            response.setExecuteStatus(ExecuteResult.ExecuteResultCode.EXECUTING.getCode());
            response.setRespTime(DateUtils.getCurrentDate());
            return ResultResponse.ok(response);
        } catch (JobExecutorNotFoundException e) {
            log.error("获取执行器错误", e);
            response.setExecuteStatus(ExecuteResult.ExecuteResultCode.EXECUTOR_NOT_FOUND.getCode());
            response.setErrorMsg(e.getMessage());

            respCode = RELAY_JOB_ERROR_CODE_PREFIX + ExecuteResult.ExecuteResultCode.EXECUTOR_NOT_FOUND.getCode();
        } catch (RelayJobEndpointException e) {
            log.error("任务执行异常", e);
            response.setExecuteStatus(ExecuteResult.ExecuteResultCode.EXECUTE_ERROR.getCode());
            response.setErrorMsg(e.getMessage());

            respCode = RELAY_JOB_ERROR_CODE_PREFIX + ExecuteResult.ExecuteResultCode.EXECUTE_ERROR.getCode();
        } catch (Exception e) {
            log.error("执行任务错误", e);
            response.setExecuteStatus(ExecuteResult.ExecuteResultCode.ERROR.getCode());
            response.setErrorMsg(ExceptionUtils.getStackInfo(e));

            respCode = RELAY_JOB_ERROR_CODE_PREFIX + ExecuteResult.ExecuteResultCode.ERROR.getCode();
        }

        response.setRespTime(DateUtils.getCurrentDate());
        return ResultResponse.fail(respCode, response);
    }

    @Override
    public ResultResponse<JobPartitionResponse> jobPartition(@Valid JobPartitionRequest partitionRequest) {
        JobPartitionResponse response = new JobPartitionResponse();
        response.setRequestSerialNo(partitionRequest.getSerialNo());

        String respCode;
        String errorMsg;
        try {
            // 执行器
            JobExecutor executor = RelayJobExecutorManager.getNewExecutor(partitionRequest.getHandleName());
            if (executor instanceof Partitionable) {
                Partitionable partition = (Partitionable) executor;
                response.setPartitionInfoList(partition.partition(partitionRequest.getEndpoints(), partitionRequest.getExecuteParam()));
            } else {
                throw new UnsupportedPartitionException("Job executor does not support partitioning, class name: " + executor.getClass());
            }
            return ResultResponse.ok(response);
        } catch (JobExecutorNotFoundException e) {
            log.error("获取执行器错误", e);

            errorMsg = ExceptionUtils.getStackInfo(e);
            respCode = RELAY_JOB_ERROR_CODE_PREFIX + ExecuteResult.ExecuteResultCode.EXECUTOR_NOT_FOUND.getCode();
        } catch (UnsupportedPartitionException e) {
            log.error("", e);

            errorMsg = ExceptionUtils.getStackInfo(e);
            respCode = RELAY_JOB_ERROR_CODE_PREFIX + ExecuteResult.ExecuteResultCode.JOB_PARTITIONING_ERROR.getCode();
        } catch (Exception e) {
            log.error("任务执行分区错误", e);

            errorMsg = ExceptionUtils.getStackInfo(e);
            respCode = RELAY_JOB_ERROR_CODE_PREFIX + ExecuteResult.ExecuteResultCode.JOB_PARTITIONING_ERROR.getCode();
        }

        response.setRespTime(DateUtils.getCurrentDate());
        return ResultResponse.fail(respCode, errorMsg, response);
    }

    @Override
    public ResultResponse<ExecuteResultCollectResponse> resultCollect(@Valid ExecuteResultCollectRequest resultCollectRequest) {
        ExecuteResultCollectResponse response = new ExecuteResultCollectResponse();
        response.setRequestSerialNo(resultCollectRequest.getSerialNo());

        String respCode;
        String errorMsg;
        try {
            // 执行器
            JobExecutor executor = RelayJobExecutorManager.getNewExecutor(resultCollectRequest.getHandleName());
            if (executor instanceof Collectable) {
                Collectable collector = (Collectable) executor;
                String result = collector.collect(resultCollectRequest.getExecuteResults());
                response.setResult(result);
            } else {
                throw new UnsupportedCollectException("Job executor does not support collecting, class name: " + executor.getClass());
            }
            return ResultResponse.ok(response);
        } catch (JobExecutorNotFoundException e) {
            log.error("获取执行器错误", e);

            errorMsg = ExceptionUtils.getStackInfo(e);
            respCode = RELAY_JOB_ERROR_CODE_PREFIX + ExecuteResult.ExecuteResultCode.EXECUTOR_NOT_FOUND.getCode();
        } catch (UnsupportedCollectException e) {
            log.error("", e);

            errorMsg = ExceptionUtils.getStackInfo(e);
            respCode = RELAY_JOB_ERROR_CODE_PREFIX + ExecuteResult.ExecuteResultCode.UNSUPPORTED_COLLECT_ERROR.getCode();
        } catch (RelayJobEndpointException e) {
            log.error("任务结果收集错误", e);

            errorMsg = ExceptionUtils.getStackInfo(e);
            respCode = RELAY_JOB_ERROR_CODE_PREFIX + ExecuteResult.ExecuteResultCode.JOB_COLLECT_ERROR.getCode();
        } catch (Exception e) {
            log.error("任务执行结果收集处理错误", e);

            errorMsg = ExceptionUtils.getStackInfo(e);
            respCode = RELAY_JOB_ERROR_CODE_PREFIX + ExecuteResult.ExecuteResultCode.JOB_COLLECT_ERROR.getCode();
        }

        response.setRespTime(DateUtils.getCurrentDate());
        return ResultResponse.fail(respCode, errorMsg, response);
    }

    @Override
    public ResultResponse<AsyncJobQueryResponse> queryExecuteResult(@Valid AsyncJobQueryRequest queryRequest) {
        AsyncJobQueryResponse response = new AsyncJobQueryResponse();
        response.setRequestSerialNo(queryRequest.getSerialNo());

        AsyncTask asyncTask = this.tasks.get(queryRequest.getJobId());
        if (asyncTask == null) {
            response.setResultCode(ExecuteResult.ExecuteResultCode.JOB_NOT_FOUND.getCode());
            response.setErrorMsg("Execute log not found.");
        } else {
            switch (asyncTask.getCycle()) {
                case FINISH:
                    response.setResultCode(ExecuteResult.ExecuteResultCode.OK.getCode());
                    response.setResult(asyncTask.getResult());
                    this.tasks.remove(queryRequest.getJobId());
                    break;
                case ERROR:
                    response.setResultCode(ExecuteResult.ExecuteResultCode.ERROR.getCode());
                    response.setErrorMsg(ExceptionUtils.getStackInfo(asyncTask.getError()));
                    this.tasks.remove(queryRequest.getJobId());
                    break;
                default:
                    response.setResultCode(ExecuteResult.ExecuteResultCode.EXECUTING.getCode());
                    break;
            }
        }

        response.setRespTime(DateUtils.getCurrentDate());
        return ResultResponse.ok(response);
    }

    @Override
    public ResultResponse<LogQueryResponseDto> queryLog(@Valid LogQueryRequestDto queryRequest) {
        LogQueryResponseDto response = new LogQueryResponseDto();
        response.setRequestSerialNo(queryRequest.getSerialNo());

        List<String> content;
        Long startLine = queryRequest.getStartLine();
        Long endLine = queryRequest.getEndLine();
        if (startLine == null) {
            content = this.logStream.read(queryRequest.getLogId());
        } else if (startLine != null && endLine == null) {
            content = this.logStream.read(queryRequest.getLogId(), startLine);
        } else {
            content = this.logStream.read(queryRequest.getLogId(), startLine, endLine);
        }
        response.setLogs(content);
        response.setTotalLine(this.logStream.getTotalRow(queryRequest.getLogId()));
        response.setRespTime(DateUtils.getCurrentDate());
        return ResultResponse.ok(response);
    }

    @Override
    public ResultResponse<LogDeleteResponseDto> deleteLog(@Valid LogDeleteRequestDto deleteRequestDto) {
        LogDeleteResponseDto response = new LogDeleteResponseDto();
        response.setRequestSerialNo(deleteRequestDto.getSerialNo());

        this.logStream.delete(deleteRequestDto.getLogId());

        response.setRespTime(DateUtils.getCurrentDate());
        return ResultResponse.ok(response);
    }

    public class JobResultCallback implements AsyncTask.ExecuteCallback {

        @Override
        public void callback(JobExecuteRequest executeRequest, String result) {
            // 获取通知的 admin 主机
            RelayJobAdminConfig.AddressInfo address = getAddressInfo();
            // 组装请求参数
            JobExecuteCallbackRequest callbackRequest = new JobExecuteCallbackRequest();
            callbackRequest.setJobId(executeRequest.getJobId());
            callbackRequest.setResult(result);
            callbackRequest.setResultCode(ExecuteResult.ExecuteResultCode.OK.getCode());
            callbackRequest.setRequestTime(executeRequest.getRequestTime());
            callbackRequest.setSerialNo(SerialNoUtils.nextId());
            callbackRequest.setVersion(UriConstant.VERSION_NO);

            doRequest(callbackRequest, address);
        }

        private RelayJobAdminConfig.AddressInfo getAddressInfo() {
            List<RelayJobAdminConfig.AddressInfo> addresses = adminConfig.getAddresses();
            if (CollectionUtils.isEmpty(addresses)) {
                throw new IllegalStateException("Not found relay job admin address.");
            }
            return addresses.get(new Random().nextInt(addresses.size()));
        }

        private void doRequest(JobExecuteCallbackRequest callbackRequest, RelayJobAdminConfig.AddressInfo address) {
            try {
                ResultResponse<JobExecuteCallbackResponse> response = this.call(callbackRequest, address);
                if (response == null) {
                    log.error("Relay job execute callback error, response is null");
                    return;
                }
                if (ResultResponse.isOk(response)) {
                    // 回调成功, 删除任务信息
                    tasks.remove(callbackRequest.getJobId());
                    if (log.isDebugEnabled()) {
                        log.debug("Relay job execute callback success, job id: {}", callbackRequest.getJobId());
                    }
                } else {
                    if (log.isErrorEnabled()) {
                        log.error("Relay job execute callback error, response code: {}, message: {}, data: {}", response.getCode(), response.getMessage(), response.getData());
                    }
                }
            } catch (RemoteCallException e) {
                log.error("Relay job executed notify admin error. request: " + callbackRequest, e);
            }
        }

        @Override
        public void onError(JobExecuteRequest executeRequest, Throwable e) {
            // 获取通知的 admin 主机
            RelayJobAdminConfig.AddressInfo address = getAddressInfo();
            // 组装请求参数
            JobExecuteCallbackRequest callbackRequest = new JobExecuteCallbackRequest();
            callbackRequest.setJobId(executeRequest.getJobId());
            callbackRequest.setErrorMsg(ExceptionUtils.getStackInfo(e));
            callbackRequest.setResultCode(ExecuteResult.ExecuteResultCode.ERROR.getCode());
            callbackRequest.setRequestTime(executeRequest.getRequestTime());
            callbackRequest.setSerialNo(SerialNoUtils.nextId());
            callbackRequest.setVersion(UriConstant.VERSION_NO);

            doRequest(callbackRequest, address);
        }

        private ResultResponse<JobExecuteCallbackResponse> call(JobExecuteCallbackRequest callbackRequest, RelayJobAdminConfig.AddressInfo address) throws RemoteCallException {
            try {
                Map<String, String> header = null;
                if (StringUtils.isNotEmpty(address.getToken())) {
                    header = new HashMap<>(2);
                    header.put(UriConstant.AUTHORIZATION_HEADER_KEY, address.getToken());
                }
                String resp = remoteCaller.call(callbackRequest, header, buildUrl(address), String.class);
                return JSONObject.parseObject(resp, new TypeReference<ResultResponse<JobExecuteCallbackResponse>>() {
                });
            } catch (Exception e) {
                throw new RemoteCallException(e);
            }
        }

        private String buildUrl(RelayJobAdminConfig.AddressInfo address) {
            return new StringBuilder("http://")
                    .append(address.getHost())
                    .append(":")
                    .append(address.getPort())
                    .append(UriConstant.TASK_ASYNC_EXECUTE_RESULT_CALLBACK_URI).toString();
        }

    }

    private AsyncTask buildAsyncTask(JobExecuteRequest executeRequest, JobExecutor executor) {
        AsyncTask<Object> asyncTask = AsyncTask.builder()
                .callback(resultCallback)
                .jobExecutor(executor)
                .request(executeRequest)
                .build();
        // 注入请求上下文拦截器
        List<RelayJobAsyncWorker.WorkInterceptor> interceptors = new LinkedList<>();
        interceptors.add(new RelayJobNetContextHolder.AsyncTaskWorkInterceptor());
        asyncTask.addInterceptors(interceptors);
        return asyncTask;
    }

}
